Solutions/Mimecast/Data Connectors/MimecastAT/SharedCode/utils.py (751 lines of code) (raw):

"""Utils File.""" import inspect import requests import json from json.decoder import JSONDecodeError import hashlib from .state_manager import StateManager from .mimecast_exception import MimecastException from .logger import applogger from . import consts from ..SharedCode.sentinel import post_data from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, retry_if_result, retry_any, RetryError, ) from requests.exceptions import ConnectionError import datetime def retry_on_status_code(response): """Check and retry based on a list of status codes. Args: response (): API response is passed Returns: Bool: if given status code is in list then true else false """ __method_name = inspect.currentframe().f_code.co_name if isinstance(response, dict): return False if response.status_code in consts.RETRY_STATUS_CODE: applogger.info( "{}(method={}) : Retrying due to status code : {}".format( consts.LOGS_STARTS_WITH, __method_name, response.status_code, ) ) return True return False class Utils: """Utils Class.""" def __init__(self, azure_function_name) -> None: """Init Function.""" self.azure_function_name = azure_function_name self.log_format = consts.LOG_FORMAT self.headers = {} self.first_page = True def check_environment_var_exist(self, environment_var): """Check the existence of required environment variables. Logs the validation process and completion. Raises MimecastException if any required field is missing. Args: environment_var(list) : variables to check for existence """ __method_name = inspect.currentframe().f_code.co_name try: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Validating Environment Variables.", ) ) missing_required_field = False for var in environment_var: key, val = next(iter(var.items())) if not val: missing_required_field = True applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Environment variable {} is not set.".format(key), ) ) if missing_required_field: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Validation failed.", ) ) raise MimecastException() applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Validation Complete.", ) ) except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() def get_checkpoint_data(self, checkpoint_obj: StateManager, load_flag=True): """Get checkpoint data from a StateManager object. Args: checkpoint_obj (StateManager): The StateManager object to retrieve checkpoint data from. load_flag (bool): A flag indicating whether to load the data as JSON (default is True). Returns: The retrieved checkpoint data. """ __method_name = inspect.currentframe().f_code.co_name try: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Fetching checkpoint data.", ) ) checkpoint_data = checkpoint_obj.get() if load_flag and checkpoint_data: checkpoint_data = json.loads(checkpoint_data) applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Checkpoint data fetched.", ) ) return checkpoint_data except json.decoder.JSONDecodeError as json_error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.JSON_DECODE_ERROR_MSG.format(json_error), ) ) raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() def post_checkpoint_data(self, checkpoint_obj: StateManager, data, dump_flag=True): """Post checkpoint data. It post the data to a checkpoint object based on the dump_flag parameter. Args: checkpoint_obj (StateManager): The StateManager object to post data to. data: The data to be posted. dump_flag (bool): A flag indicating whether to dump the data as JSON before posting (default is True). """ __method_name = inspect.currentframe().f_code.co_name try: if dump_flag: checkpoint_obj.post(json.dumps(data)) else: checkpoint_obj.post(data) applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Checkpoint posted to azure storage.", ) ) except TypeError as type_error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.TYPE_ERROR_MSG.format(type_error), ) ) raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() @retry( stop=stop_after_attempt(consts.MAX_RETRIES), wait=wait_exponential( multiplier=consts.BACKOFF_MULTIPLIER, min=consts.MIN_SLEEP_TIME, max=consts.MAX_SLEEP_TIME, ), retry=retry_any( retry_if_result(retry_on_status_code), retry_if_exception_type(ConnectionError), ), before_sleep=lambda retry_state: applogger.error( "{}(method = {}) : Retrying after {} seconds, attempt number : {} ".format( consts.LOGS_STARTS_WITH, " Retry Decorator", retry_state.upcoming_sleep, retry_state.attempt_number, ) ), ) def make_rest_call( self, method, url, params=None, data=None, json=None, check_retry=True ): """Make a rest call. Args: url (str): The URL to make the call to. method (str): The HTTP method to use for the call. params (dict, optional): The parameters to pass in the call (default is None). data (dict, optional): The body(in x-www-form-urlencoded formate) of the request (default is None). json (dict, optional): The body(in row formate) of the request (default is None). check_retry (bool, optional): A flag indicating whether to check for retry (default is True). Returns: dict: The JSON response if the call is successful. """ __method_name = inspect.currentframe().f_code.co_name try: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Rest Call, Method :{}, url: {}".format(method, url), ) ) response = requests.request( method, url, headers=self.headers, params=params, data=data, json=json, timeout=consts.MAX_TIMEOUT_SENTINEL, ) if response.status_code >= 200 and response.status_code <= 299: response_json = response.json() applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Success, Status code : {}".format(response.status_code), ) ) self.handle_failed_response_for_success(response_json) return response_json elif response.status_code == 400: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Bad Request = {}, Status code : {}".format( response.text, response.status_code ), ) ) self.handle_failed_response_for_failure(response) elif response.status_code == 401: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Unauthorized, Status code : {}".format(response.status_code), ) ) response_json = response.json() fail_json = response_json.get("fail", []) error_code = None error_message = None if fail_json: error_code = fail_json[0].get("code") error_message = fail_json[0].get("message") if check_retry: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Generating new token, Error message = {}, Error code = {}".format( error_message, error_code ), ) ) check_retry = False self.authenticate_mimecast_api(check_retry) return self.make_rest_call( method, url=url, json=json, check_retry=check_retry ) else: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Max retry reached for generating access token," "Error message = {}, Error code = {}".format( error_message, error_code ), ) ) raise MimecastException() elif response.status_code == 403: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Forbidden, Status code : {}".format(response.status_code), ) ) self.handle_failed_response_for_failure(response) elif response.status_code == 404: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Not Found, URL : {}, Status code : {}".format( url, response.status_code ), ) ) raise MimecastException() elif response.status_code == 409: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Conflict, Status code : {}".format(response.status_code), ) ) self.handle_failed_response_for_failure(response) elif response.status_code == 429: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Too Many Requests, Status code : {} ".format( response.status_code ), ) ) return response elif response.status_code == 500: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Internal Server Error, Status code : {}".format( response.status_code ), ) ) return self.handle_failed_response_for_failure(response) applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Unexpected Error = {}, Status code : {}".format( response.text, response.status_code ), ) ) raise MimecastException() except MimecastException: raise MimecastException() except requests.exceptions.Timeout as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.TIME_OUT_ERROR_MSG.format(error), ) ) raise MimecastException() except JSONDecodeError as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.JSON_DECODE_ERROR_MSG.format( "{}, API Response = {}".format(error, response.text) ), ) ) raise MimecastException() except requests.ConnectionError as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.CONNECTION_ERROR_MSG.format(error), ) ) raise ConnectionError() except requests.RequestException as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.REQUEST_ERROR_MSG.format(error), ) ) raise MimecastException() except Exception as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(error), ) ) raise MimecastException() def handle_failed_response_for_failure(self, response): """Handle the failed response for failure status codes. If request get authentication error it will regenerate the access token. Args: response_json (dict): The JSON response from the API. """ __method_name = inspect.currentframe().f_code.co_name try: response_json = response.json() error_message = response_json fail_json = response_json.get("fail", []) error_json = response_json.get("error") if fail_json: error_message = fail_json[0].get("message") elif error_json: error_message = error_json.get("message") applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, error_message, ) ) if response.status_code in consts.EXCEPTION_STATUS_CODE: raise MimecastException() return response except MimecastException: raise MimecastException() except Exception as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(error), ) ) raise MimecastException() def handle_failed_response_for_success(self, response_json): """Handle the failed response for a successful request. Check if there is failure in success response or not. Args: response_json (dict): The JSON response from the request. """ __method_name = inspect.currentframe().f_code.co_name try: fail_json = response_json.get("fail", []) if fail_json: try: error_message = fail_json[0].get("errors")[0].get("message") except (KeyError, IndexError, ValueError, TypeError): error_message = fail_json applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Failed response message = {}.".format(error_message), ) ) raise MimecastException() else: applogger.debug( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "No failed response found.", ) ) return except MimecastException: raise MimecastException() except Exception as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(error), ) ) raise MimecastException() def authenticate_mimecast_api(self, check_retry=True): """Authenticate mimecast endpoint generate access token and update header. Args: check_retry (bool): Flag for retry of generating access token. """ __method_name = inspect.currentframe().f_code.co_name try: body = { "client_id": consts.MIMECAST_CLIENT_ID, "client_secret": consts.MIMECAST_CLIENT_SECRET, "grant_type": "client_credentials", } applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Generating access token.", ) ) self.headers = {} url = "{}{}".format(consts.BASE_URL, consts.ENDPOINTS["OAUTH2"]) response = self.make_rest_call( method="POST", url=url, data=body, check_retry=check_retry ) if "access_token" in response: access_token = response.get("access_token") self.headers.update( { "Content-Type": "application/json", "Authorization": "Bearer {}".format(access_token), } ) applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Successfully generated access token and header updated.", ) ) return applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Error occurred while fetching the access token from the response = {}.".format( response ), ) ) raise MimecastException() except MimecastException: raise MimecastException() except RetryError as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.MAX_RETRY_ERROR_MSG.format( error, error.last_attempt.exception() ), ) ) raise MimecastException() except KeyError as key_error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.KEY_ERROR_MSG.format(key_error), ) ) raise MimecastException() except Exception as error: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(error), ) ) raise MimecastException() def convert_to_hash(self, data_to_hash): """Convert json data records to hash. Args: data_to_hash (list): list of data to hash Returns: list: hashed data list """ __method_name = inspect.currentframe().f_code.co_name hashed_data = [] for record in data_to_hash: try: json_string = json.dumps(record) res_hash = hashlib.sha256( json_string.encode("utf-8", errors="replace") ).hexdigest() hashed_data.append(res_hash) except TypeError as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.TYPE_ERROR_MSG.format(err), ) ) continue except ValueError as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.VALUE_ERROR_MSG.format(err), ) ) continue except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) continue return hashed_data def compare_data_with_checkpoint(self, data_to_compare, state_manager_obj): """Compare data with checkpoint hash and return unique records. Args: data_to_compare (list): list of data to compare state_manager_obj (Statemanager): statemanager object to get checkpoint hash data Returns: list: list of unique data. """ __method_name = inspect.currentframe().f_code.co_name try: hashed_data = self.convert_to_hash(data_to_compare) checkpoint_hash_list = self.get_checkpoint_data(state_manager_obj, True) if checkpoint_hash_list: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Found hashed data checkpoint.", ) ) hash_record = { hashed_data[i]: record for i, record in enumerate(data_to_compare) } unique_hashes = set(hashed_data) - set(checkpoint_hash_list) unique_data = [hash_record[hash] for hash in unique_hashes] return unique_data applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Hashed data checkpoint not found.", ) ) return data_to_compare except MimecastException: raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() def filter_unique_data_and_post( self, data_to_post, hash_file_state_manager_obj, table_name, checkpoint_data, checkpoint_obj, next_page_token_flag, ): """Filter unique data from the given data and post it to Azure Sentinel Log Analytics. Args: data_to_post (list): The data to be posted to azure Sentinel log analytics. hash_file_state_manager_obj (StateManager): The StateManager object to retrieve hash checkpoint data. table_name (str): The custom log table name in which data will be added. checkpoint_data (dict): The checkpoint data to be updated. checkpoint_obj (StateManager): The StateManager object to post checkpoint data to. next_page_token_flag (bool): A flag indicating whether a next page token is present. Returns: bool: True if checkpoint data is deleted, otherwise False. """ __method_name = inspect.currentframe().f_code.co_name try: if self.first_page: data_to_post = self.compare_data_with_checkpoint( data_to_post, hash_file_state_manager_obj ) self.first_page = False if len(data_to_post) > 0: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Posting data to azure Sentinel log analytics, data count : {}.".format( len(data_to_post) ), ) ) post_data(json.dumps(data_to_post), log_type=table_name) if not next_page_token_flag: if checkpoint_data: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "New data found in same last page, so resetting checkpoint", ) ) checkpoint_data.update( {"date": datetime.datetime.utcnow().isoformat()} ) else: checkpoint_data = { "date": datetime.datetime.utcnow().isoformat() } self.post_checkpoint_data(checkpoint_obj, checkpoint_data) return False else: applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Data already posted to sentinel, no new data found.", ) ) return self.compare_last_pagetoken_time( checkpoint_data, checkpoint_obj, hash_file_state_manager_obj ) except MimecastException: raise MimecastException() except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException() def compare_last_pagetoken_time( self, checkpoint_data, checkpoint_obj, hash_file_state_manager_obj ): """Compare last page token time with current time. Args: checkpoint_data (dict): checkpoint data checkpoint_obj (StateManager): The StateManager object to post checkpoint data to. hash_file_state_manager_obj (StateManager): The StateManager object to retrieve hash checkpoint data. Returns: checkpoint_data (dict): checkpoint data """ __method_name = inspect.currentframe().f_code.co_name try: if checkpoint_data: last_page_token_time = checkpoint_data.get("date") if ( last_page_token_time and datetime.datetime.utcnow() - datetime.datetime.fromisoformat(last_page_token_time) >= datetime.timedelta(hours=consts.CHECKPOINT_RESET_TIME) ): applogger.info( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, "Last date time more than 12 hours old, so resetting checkpoints", ) ) checkpoint_data = {} self.post_checkpoint_data(checkpoint_obj, checkpoint_data) self.post_checkpoint_data( hash_file_state_manager_obj, checkpoint_data ) return True return False except Exception as err: applogger.error( self.log_format.format( consts.LOGS_STARTS_WITH, __method_name, self.azure_function_name, consts.UNEXPECTED_ERROR_MSG.format(err), ) ) raise MimecastException()